package com.godenwater.recv.server.sl651;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.text.SimpleDateFormat;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.godenwater.recv.manager.MessageManager;
import com.godenwater.recv.manager.MonitorManager;
import com.godenwater.recv.manager.SessionManager;
import com.godenwater.recv.manager.TaskManager;
import com.godenwater.recv.model.CommonMessage;
import com.godenwater.recv.utils.TaskEngine;

import cn.gov.mwr.sl651.HydroBuilder;
import cn.gov.mwr.sl651.IMessage;
import cn.gov.mwr.sl651.Symbol;
import cn.gov.mwr.sl651.command.DownCommand;
import cn.gov.mwr.sl651.utils.ByteUtil;

import com.godenwater.core.container.Module;

public class HydroServer {

	private static Logger logger = LoggerFactory.getLogger(HydroServer.class);

	private long startDate = System.currentTimeMillis();

	private static HydroServer instance;

	private boolean RUNNING = false;
	// ----------------------------------------------------------------------
	/**
	 * All modules loaded by this server
	 */
	private Map<Class, Module> modules = new LinkedHashMap<Class, Module>();

	private ClassLoader loader;

	// 创建一个报文队列, 应根据遥测机的数量而定，默认给定300
	private BlockingQueue<CommonMessage> queueOriginal = new LinkedBlockingQueue<CommonMessage>(
			1000);

	private BlockingQueue<IMessage> queueCombin = new LinkedBlockingQueue<IMessage>(
			1000);
 

	private BlockingQueue<String> queueLogs = new LinkedBlockingQueue<String>(
			1000);
	// 创建10个生产线程，使用线程池的方式，可以保障线程的安全性。
	ExecutorService service = Executors.newFixedThreadPool(10);

	// 计数
	public final AtomicInteger wc = new AtomicInteger();

	/** 传输消息最大数量 */
	public static final int MAX_SERIAL = 65535;

	// 定义所管理的测站列表
	private static SimpleDateFormat sdf = new SimpleDateFormat("yyMMddHHmmss");

	/**
	 * Returns a singleton instance of XMPPServer.
	 * 
	 * @return an instance.
	 */
	public static HydroServer getInstance() {
		if (instance == null) {
			instance = new HydroServer();
		}
		return instance;
	}

	/**
	 * Creates a server and starts it.
	 */
	private HydroServer() {
		if (instance != null) {
			throw new IllegalStateException(">> RTU server is already running");
		}
		instance = this;
		// start();
	}

	/**
	 * 初始化数据
	 */
	public void initialize() {

		startDate = System.currentTimeMillis();

		loader = Thread.currentThread().getContextClassLoader();

	}

	/**
	 * 加载内部模块
	 */
	private void loadModules() {

		loadModule(ConnectionManager.class.getName()); // 加载连接管理器
		loadModule(SessionManager.class.getName()); // 加载会话管理
		loadModule(ProcotolManager.class.getName()); // 加载协议信息

//		loadModule(StationCacheManager.class.getName()); // 加载测站信息
		//loadModule(StationManager.class.getName()); // 加载测站信息
		loadModule(MessageManager.class.getName()); // 加载会话管理
		loadModule(TaskManager.class.getName()); // 加载任务管理器

		loadModule(MonitorManager.class.getName()); // 加载监控终端管理器

	}

	/**
	 * 加载模块
	 * 
	 * @param module
	 */
	private void loadModule(String module) {
		System.out.println(">> Load " + module + "...");
		try {
			Class modClass = loader.loadClass(module);
			Module mod = (Module) modClass.newInstance();
			this.modules.put(modClass, mod);
		} catch (Exception e) {
			e.printStackTrace();
			logger.error("admin.error", e);
		}
	}

	private void initModules() {
		for (Module module : modules.values()) {
			boolean isInitialized = false;
			try {
				module.initialize();
				isInitialized = true;
			} catch (Exception e) {
				e.printStackTrace();
				// Remove the failed initialized module
				this.modules.remove(module.getClass());
				if (isInitialized) {
					module.stop();
					module.destroy();
				}
				logger.error("initModules.error", e);
			}
		}
	}

	private void startModules() {
		for (Module module : modules.values()) {
			boolean started = false;
			try {
				module.start();
			} catch (Exception e) {
				if (started && module != null) {
					module.stop();
					module.destroy();
				}
				logger.error("startModules.error", e);
			}
		}
	}

	public SessionManager getSessionManager() {
		return (SessionManager) modules.get(SessionManager.class);
	}

	public MonitorManager getMonitorManager() {
		return (MonitorManager) modules.get(MonitorManager.class);
	}
 
	public MessageManager getMessageManager() {
		return (MessageManager) modules.get(MessageManager.class);
	}

	public ConnectionManager getConnectionManager() {
		return (ConnectionManager) modules.get(ConnectionManager.class);
	}

	public ProcotolManager getProcotolManager() {
		return (ProcotolManager) modules.get(ProcotolManager.class);
	}

	public MonitorManager getLogManager() {
		return (MonitorManager) modules.get(MonitorManager.class);
	}

	/**
	 * 启动服务实例
	 */
	public void start() {

		try {
			initialize();
			// load all the modules
			loadModules();
			// Initize all the modules
			initModules();
			// Start all the modules
			startModules();

			// ---------------------------

			RUNNING = true;

			// service.execute(new MessageConsumer(this));
			// service.execute(new CombinConsumer(this));
			// service.execute(new RtuDatabaseConsumer(this));
			// service.execute(new BizDatabaseConsumer(this));

			System.out.println("RTU SERVER >> 水文水资源数据平台服务启动完成.");

			//System.out.println("RTU SERVER >> Config.getPicTempPath() ." + HydroConfig.getPicTempPath());
			//System.out.println("RTU SERVER >> Config.getPicSavePath() ." + HydroConfig.getPicSavePath());

			startDate = System.currentTimeMillis();

		} catch (Exception e) {
			e.printStackTrace();
			logger.error(e.getMessage(), e);
			System.out.println("RTU SERVER >> 水文水资源数据平台.error");
			// shutdownServer();
		}

	}

	/**
	 * 关闭服务实例
	 */
	public void stop() {

		RUNNING = false;
		try {
			Thread.sleep(3 * 1000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		service.shutdown();
		// service.shutdownNow();
		System.out.println("正在关闭处理服务，请等待....");
		try {
			Thread.sleep(3 * 1000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

		// If we don't have modules then the server has already been shutdown
		if (modules.isEmpty()) {
			return;
		}
		// Get all modules and stop and destroy them
		for (Module module : modules.values()) {
			module.stop();
			module.destroy();
		}
		modules.clear();

		TaskEngine.getInstance().shutdown();

		System.out.println("RTU SERVER >> Shutting down...");
	}

	public boolean running() {
		return RUNNING;
	}

	/**
	 * 检查数据库是否异常
	 * 
	 * @return
	 */
	public boolean verifyDb() {
		// BaseDao dao = (BaseDao) Application.getInstance().getBean("baseDao");
		// java.sql.Connection conn = null;
		// try {
		// conn = dao.getDataSource().getConnection();
		//
		// if (conn.isClosed()) {
		// return false;
		// }
		// } catch (SQLException e) {
		// // TODO Auto-generated catch block
		// e.printStackTrace();
		// } finally {
		// if (conn != null) {
		// try {
		// conn.close();
		// } catch (SQLException e) {
		// // TODO Auto-generated catch block
		// e.printStackTrace();
		// }
		// }
		// }
		return true;
	}

	public long getStartDate() {
		return startDate;
	}

	// private synchronized void createListeners() {
	// createTcpListener();
	// createUdpListener();
	// createGsmListener();
	// createPstnListener();
	// createCommListener();
	// }

	/**
	 * 原始报文，消息入队
	 * 
	 * @param IMessage
	 * @return
	 */
	public boolean queuePush(CommonMessage message) {
		return this.queueOriginal.offer(message);
	}

	/**
	 * 原始报文，消息出队
	 * 
	 * @return
	 */
	public CommonMessage queuePoll() {
		CommonMessage result = null;
		try {
			result = this.queueOriginal.take();
		} catch (InterruptedException e) {
			logger.error("", e);
		}
		return result;
	}

	/**
	 * 原始报文，获取队列大小
	 * 
	 * @return
	 */
	public int queueSize() {
		return this.queueOriginal.size();
	}
 

	/**
	 * 合并报文，消息入队
	 * 
	 * @param IMessage
	 * @return
	 */
	public boolean queueCombinPush(IMessage message) {
		return this.queueCombin.offer(message);
	}

	/**
	 * 合并报文，消息出队
	 * 
	 * @return
	 */
	public IMessage queueCombinPoll() {
		IMessage result = null;
		try {
			result = this.queueCombin.take();
		} catch (InterruptedException e) {
			logger.error("", e);
		}
		return result;
	}

	/**
	 * 合并报文，获取队列大小
	 * 
	 * @return
	 */
	public int queueCombinSize() {
		return this.queueCombin.size();
	}

	/**
	 * 消息处理日志
	 * 
	 * @param IMessage
	 * @return
	 */
	public BlockingQueue<String> queueLogs() {
		return this.queueLogs;
	}

	/**
	 * 消息处理日志，入队列
	 * 
	 * @param IMessage
	 * @return
	 */
	public boolean queueLogsPush(String message) {
		return this.queueLogs.offer(message);
	}

	/**
	 * 消息处理日志，消息出队
	 * 
	 * @return
	 */
	public String queueLogsPoll() {
		String result = null;
		try {
			result = this.queueLogs.take();
		} catch (InterruptedException e) {
			logger.error("", e);
		}
		return result;
	}

	/**
	 * 获取消息总数
	 * 
	 * @return
	 */
	public int messageSize() {
		if (wc.get() == Integer.MAX_VALUE) {
			wc.set(0);
		}
		return wc.incrementAndGet();
	}

	/**
	 * 保存错误的消息
	 */
	protected void saveErrMessage(IMessage message) {

	}

	protected void sendCommand(String command) {
		DownCommand cmd = new DownCommand();
		cmd.setBodyStartBit(Symbol.STX);
		cmd.setEof(Symbol.ENQ);

		cmd.setCenterAddr(ByteUtil.HexStringToBinary("FF"));
		cmd.setStationAddr(ByteUtil.HexStringToBinary("0012345678"));
		cmd.setPassword(ByteUtil.HexStringToBinary("FFFF"));

		IMessage message = null;
		// ------------------------------
		if (StringUtils.isNotEmpty(StringUtils.trim(command))) {

			byte[] funcCode = ByteUtil.HexStringToBinary(command);

			switch (funcCode[0]) {
			case 0x36: // 查询图片采集信息
			case 0x37: // 查询遥测站实时数据
			case 0x39: // 查询人工置数报
			case 0x44: // 查询水泵电机实时工作数据
			case 0x45: // 查询遥测站软件版本
			case 0x46: // 查询遥测站状态和报警信息
			case 0x47: // 初始化固态存储数据
			case 0x48: // 恢复遥测站出厂设置
			case 0x4A: // 设置遥测站时钟
			case 0x50: // 中心站查询遥测站事件记录
			case 0x51: // 中心站查询遥测站时钟
				cmd.setFuncCode(funcCode);

				message = cmd.send37Message(0, null);
				break;
			case 0x38:// 查询时段数据报
				cmd.setFuncCode(funcCode);
				message = cmd.send38Message(1, null, "13010108", "13030108",
						"000600", "F0,1F,20");
				break;
			case 0x3A: // 查询指定要素的实时数据
				cmd.setFuncCode(funcCode);
				// message = cmd.send3AMessage(1, null, itemList);
				break;
			case 0x40: // 中心站修改遥测站基本配置表
			case 0x42: // 中心站修改遥测站运行参数配置表
				cmd.setFuncCode(funcCode);
				// message = cmd.send40Message(1, null, itemList);
				break;
			case 0x41: // 中心站读取遥测站基本配置表
			case 0x43: // 中心站读取遥测站运行参数配置表
				cmd.setFuncCode(funcCode);
				// message = cmd.send41Message(1, null, itemList);
				break;
			case 0x49: // 修改密码
				cmd.setFuncCode(funcCode);
				// cmd.send49Message(0, null, oldPwd, newPwd);
				break;
			default:
				System.out.println("未知指令，请重新发送正确指令或输入HELP进行指令查看.");
			}

			if (message != null) {
				byte[] tmpMessage = HydroBuilder.toByte(message,
						cmd.getUpDown());
				HydroServer.getInstance().getSessionManager()
						.broadcast(tmpMessage);
				System.out.println("---------中心站查询遥测站-------功能码：0x" + command
						+ "---------");
				System.out.println("byte "+ ByteUtil.toHexString(tmpMessage));
				System.out.println("cmd  "+cmd.printHexString( message));
				System.out
						.println("----------------------------------------------------------");
			}
		}

	}

	public static void main(String[] args) {

//		DbWriteServer indbServer = DbWriteServer.getInstance();
//		indbServer.start();

		HydroServer server = HydroServer.getInstance();
		server.start();
		InputStreamReader is = new InputStreamReader(System.in);
		BufferedReader br = new BufferedReader(is);
		try {
			String cmd = "";

			boolean flag = true;
			System.out.print("CMD>>");
			while (flag) {
				cmd = br.readLine();
				if (cmd.equalsIgnoreCase("exit")
						|| cmd.equalsIgnoreCase("quit")) {
					server.stop();
					flag = false;
					try {
						Thread.sleep(2000);
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					System.exit(0);

				} else if (cmd.equalsIgnoreCase("help")) {
					HydroHelp.printHelp();
				} else if (cmd.equalsIgnoreCase("client")) {

				} else if (cmd.equalsIgnoreCase("data")) {
					HydroHelp.printData(server.queueSize());
				} else {
					server.sendCommand(cmd);
				}

				System.out.print("CMD>>");
			}

		} catch (IOException e) {
			System.out.println("系统错误！");
			e.printStackTrace();
		} finally {
			try {
				is.close();
				br.close();
			} catch (IOException e) {
				System.out.println("关闭流发生错误！");
				e.printStackTrace();
			}
		}
	}

}
